-
Notifications
You must be signed in to change notification settings - Fork 595
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: allow configure other additional columns for connectors #14215
Conversation
on second thought, we are going to use both partition column and offset column to record the consumption process, and deprecating |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
generally LGTM
AND timestamp_col IS NOT NULL | ||
AND header_col IS NOT NULL | ||
---- | ||
101 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How do we know what count to expect here 🤔
Where can I find the input data
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done |
it will generate message like
key | payload | header |
---|---|---|
key1 | {"a": 1} | [(header1, v1), (header2, v2)] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
May mention this in comment to avoid confusion
name, | ||
id, | ||
DataType::List(get_kafka_header_item_datatype().into()), | ||
AdditionalColumnType::Header, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be DataType::Struct
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, Kafka header is a list of kvs, with schema (varchar, bytes). The list can be empty, ie. no header and multiple kvs,
here is an example
for i in {0..100}; do echo "key$i:{\"a\": $i}" | ${KCAT_BIN} -P -b message_queue:29092 -t ${ADDI_COLUMN_TOPIC} -K : -H "header1=v1" -H "header2=v2"; done |
kcat will generate 101 messages containing header with two kvs (header1, v1) and (header2, v2).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is legacy code for _rw_kafka_timestamp
considered?
good question, I almost forgot it. |
Yes, #13707 is in v1.6.0 and will not be in the doc, will notify the doc team. |
- Add `_rw_kafka_timestamp` column to messages from Kafka source - Handle addition of columns and bind primary key columns - Set connector to backfill mode and enable CDC sharing mode - Check and add timestamp column before generating column IDs - Throw error if source does not support PRIMARY KEY constraint - Bind source watermark based on columns - Resolve privatelink connection for Kafka source - Create PbSource object with provided properties - Import `KAFKA_TIMESTAMP_COLUMN_NAME` and handle legacy column in `trad_source.rs` Signed-off-by: tabVersion <[email protected]>
// }), | ||
// ), | ||
( | ||
"header", // type: struct<key varchar, value bytea>[] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think JSONB
is better for storing Kafka header because it can get (->
) a value easily.
A related topic was discussed at #13387 (Summary at #13387 (comment))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The problem is jsonb type do not support bytes inside.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point. 🤣 Thinking...
The downside of struct<key varchar, value bytea>[]
is obvious: RW/PG doesn't provide any function to get a value by key. I don't know how the users can do that...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can support array_filter()
https://github.com/risingwavelabs/rfcs/pull/69/files#diff-857a6f40f71644499fee9c269c260a570942420de9a0225b059508d02c1fe98bR127-R138 or array_find
for it. if there is not too many fields in the array.
Or support Map
datatype...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe lambda can do the work 😈
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for the rest
Oh, I plan to do the refactor in #14384. After this, we will always derive offset and partition columns for source and table with connector regardless of whether users explicitly include them. The clause just changes the visibility of the two columns. |
- Added new source `s10` with columns `v1` and `v2` - Included a timestamp column `some_ts` in the `s10` source - Configured `s10` source as a Kafka connector with topic, bootstrap server, and startup mode properties - Implemented a query to filter rows from `s10` based on a specific timestamp - Dropped tables `s8` and `s9` - Removed source `s9` - Removed source `s10` Signed-off-by: tabVersion <[email protected]>
Signed-off-by: tabVersion <[email protected]>
I think |
Yes, but the example above works. Let me find out why... |
I think it may because the predicate becomes a |
alright, can you help remove the hard code col to prevent future panic? |
can
…On Wed, 17 Jan 2024 at 16:47, Bohan Zhang ***@***.***> wrote:
I think it may because the predicate becomes a FILTER above SOURCE, but
not pushed down into the source.
alright, can you help remove the hard code col to prevent future panic?
—
Reply to this email directly, view it on GitHub
<#14215 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AJBQZNM6HTUIVUNIZ7ETXC3YO6FYJAVCNFSM6AAAAABBDDCBXSVHI2DSMVQWIX3LMV43OSLTON2WKQ3PNVWWK3TUHMYTQOJVGM2DOOBZGU>
.
You are receiving this because your review was requested.Message ID:
***@***.***>
|
I hereby agree to the terms of the RisingWave Labs, Inc. Contributor License Agreement.
What's changed and what's your intention?
following #13707 and the final part of risingwavelabs/rfcs#79
the syntax will be like
accept columns for each connector -> https://github.com/risingwavelabs/rfcs/blob/tab/include-key-as/rfcs/0079-include-key-as.md
Checklist
./risedev check
(or alias,./risedev c
)Documentation
Release note
If this PR includes changes that directly affect users or other significant modifications relevant to the community, kindly draft a release note to provide a concise summary of these changes. Please prioritize highlighting the impact these changes will have on users.
described above.
a special note to batch queries on source
in prev impl, we always insert a
timestamptz
column to the catalog for all source with kafka.but when introducing
include timestamp
the semantic is the same. So in this pr, we no longer insert the column if already include timestamp.a minor change for batch query
for source like
the query works
select * from s where _rw_kafka_timestamp > '1977-01-01 00:00:00'
but if there is an alias specified
the query should be
select * from s where some_ts > '1977-01-01 00:00:00'